package cn.inspur.dataimport;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class JDConsumer_jd_phone_list {
    public static void main(String[] args){

        Properties cou = new Properties();
        cou.setProperty("bootstrap.servers", "nd11:9092,nd12:9092,nd13:9092");
        cou.setProperty("group.id","group1");
        cou.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        cou.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(cou);
        consumer.subscribe(Collections.singletonList("jd_phone_list"));
        FSDataOutputStream fos=null;
        try{
            Configuration conf = new Configuration();

            conf.set("fs.defaultFS","hdfs://nd11:9000");

            FileSystem fs = FileSystem.get(conf);
            fos = fs.create(new Path("/e-commerce/jd_phone_list.txt"));

            int cnt = 0;
            while(true){
                ConsumerRecords<String,String> records = consumer.poll(100);
                for (ConsumerRecord<String,String> r:records){
                    String value = r.value();
                    fos.write((value+"\n").getBytes());
                    //保证写到磁盘
                    fos.hsync();
                }
                cnt+=records.count();
                System.out.println("消费："+cnt);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try {
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            consumer.close();
        }
    }
}
